package cn.doitedu.ml.demo

import cn.doitedu.commons.util.SparkUtil
import org.apache.spark.ml.linalg
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
 * @date: 2020/2/17
 * @site: www.doitedu.cn
 * @author: hunter.d 涛哥
 * @qq: 657270652
 * @description: 在spark代码中，读取文件数据，生成Vector类型的RDD
  *   该案例中蕴含的思想和知识：
  *   1. 一个机器学习算法程序的开发套路： 读特征数据、加工特征数据、写计算算法、调用算法的结果
  *   2. 机器学习算法的效果是哪些因素有关： 特征的选取；特征加工（比如规范化、离散化、区间化）; 算法的选择；
  *   3. sparksql中如何自定义udf及使用
  *
 */
object File2VectorRDD {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkUtil.getSparkSession("vec_demo")
    import spark.implicits._

    // 读原始数据
    val ds: Dataset[String] = spark.read.textFile("userprofile/data/demo/persons.txt")

    // 加工原始形式String的数据，变成Vector类型的数据
    val df: DataFrame = ds.map(line=>{

      // 从字符串原始数据中，抽取出特征值double数组
      // u1,30,178,70,2000
      val arr: Array[String] = line.split(",")

      // 取出uid
      val uid = arr(0)

      // 取出特征值数组
      var features: Array[Double] = arr.tail.map(_.toDouble)

      // 将特征值数组封装成向量
      val vector = Vectors.dense(features)

      (uid,vector)
    }).toDF("uid","vec")


    // 计算人群中任意两人之间的欧氏距离相似度和余弦相似度
    // 结果形式：    u1    u2    0.283    0.566

    // 将向量rdd 自join
    val joined: DataFrame = df.join(df.toDF("b_uid","b_vec"),'uid < 'b_uid)
    /**
      * +---+------------------------+-----+------------------------+
      * |uid|vec                     |b_uid|b_vec                   |
      * +---+------------------------+-----+------------------------+
      * |u1 |[30.0,178.0,70.0,2000.0]|u2   |[32.0,168.0,60.0,2600.0]| row
      * |u1 |[30.0,178.0,70.0,2000.0]|u3   |[30.0,158.0,70.0,2800.0]| row
      * |u1 |[30.0,178.0,70.0,2000.0]|u4   |[34.0,188.0,78.0,3000.0]| row
      *
      */

    // 开发计算欧氏距离相似度的scala函数
    val eudisim = (v1:linalg.Vector,v2:linalg.Vector)=>{
      // 计算距离平方
      val sqdist = Vectors.sqdist(v1,v2)
      // 计算相似度
      1/(Math.pow(sqdist,0.5)+1)
    }

    // 开发计算余弦相似度的scala函数
    val cossim = (v1:linalg.Vector,v2:linalg.Vector)=>{

      val v1arr = v1.toArray
      val v2arr = v2.toArray
      // 求v1的模平方
      val m1: Double = v1arr.map(Math.pow(_,2)).sum
      // 求v2的模平方
      val m2: Double = v2arr.map(Math.pow(_,2)).sum

      // 求v1和v2的点乘
      val innerProduct: Double = v1arr.zip(v2arr).map(tp=>tp._1*tp._2).sum

      innerProduct/Math.pow(m1*m2,0.5)
    }

    // 然后将scala函数放在sparksql的计算中使用

    /**
      * 方式1：将scala函数，注册成sql函数
       */

    spark.udf.register("eudisim",eudisim)
    spark.udf.register("cossim",cossim)


    // 1.1 使用函数
    joined.selectExpr("uid","b_uid","eudisim(vec,b_vec) as sim_eudi","cossim(vec,b_vec) as sim_cos")
        .show(50,false)

    // 1.2 也可以这样用
    joined.createTempView("joined")
    spark.sql(
      """
        |
        |select
        |uid,
        |b_uid,
        |eudisim(vec,b_vec) as sim_eudi,
        |cossim(vec,b_vec) as sim_cos
        |
        |from joined
        |
      """.stripMargin)


    /**
      * 方式2：不用注册，而是将scala函数，用udf这个函数，将我们的自定义函数包装成UserDefinedFunction
      */
    //
    import org.apache.spark.sql.functions._
    val dsl_eudisim:UserDefinedFunction = udf(eudisim)
    val dsl_cossim:UserDefinedFunction = udf(cossim)
    // 在dsl风格的api中调用
    joined.select(
      'uid,
      'b_uid,
      dsl_eudisim('vec,'b_vec).as("sim_eudi"),
      dsl_cossim('vec,'b_vec).as("sim_cos")
    )

    // 补充小知识，sparksql 的select中参数类型（Column）的构造方法：
    joined.select('uid)    // 用这种方式：需要  import spark.implicits._
    joined.select($"uid")  // 用这种方式：需要  import spark.implicits._
    joined.select(col("uid")) // 需要 import org.apache.spark.sql.functions._
    joined.select(joined("uid"))

    spark.close()

  }
}
